Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[NYS2AWS-134] second attempt transactions merger #70

Merged
merged 21 commits into from
Feb 11, 2025

Conversation

pvriel
Copy link
Contributor

@pvriel pvriel commented Feb 6, 2025

https://xenitsupport.jira.com/browse/NYS2AWS-134

Besides fetching the transaction IDs (because we simply did not consider it), most mechanisms have been parallelized to improve the performance. Now, the health processor platform can process ~ 3300 transactions / second instead of just 200.

Please note that Sonar is complaining about InterruptExceptions that are not being thrown. This is impossible, since the code is executed in the .run() method of a Runnable instance. Technically speaking, you can use @ SneakyThrows (and I considered it), but I think it's better to clearly indicate & handle the exceptions (as much as possible) that can be thrown.
Sonar is also complaining about the fact that the ToggleableHealthProcessorPlugin() constructor is public, while this is a required.
Sonar is also complaining about the fact that I'm implementing the mergeTransactions method with a Runnable instance (using the functional interface mechanism) instead of passing a lambda (seriously?), however, if you do that, the unit tests can't properly mock the lambda.

Explanation of how the indexing strategy works:

sequenceDiagram
    box Purple Healthprocessor platform
    participant ProcessorService
    participant ThresholdIndexingStrategy
    end
    box Darkblue ThresholdIndexingStrategy internal workings
    participant Shared queue A
    participant ThresholdIndexingStrategyTransactionIdFetcher
    participant Shared queue B
    participant ThresholdIndexingStrategyTransactionIdMerger
    end
    box Darkgreen Alfresco components
    participant SearchTrackingComponent
    end

    
    ProcessorService->>ThresholdIndexingStrategy: .onStart()
    ThresholdIndexingStrategy->>ThresholdIndexingStrategy: Initialize state & progress
    ThresholdIndexingStrategy->>ThresholdIndexingStrategyTransactionIdFetcher: .run() (in separate thread)
    ThresholdIndexingStrategy->>ThresholdIndexingStrategyTransactionIdMerger: .run() (in seperate thread(s))
    
    par Fetch TxnIDs
        loop while not stopped
            ThresholdIndexingStrategyTransactionIdFetcher->>SearchTrackingComponent: Fetch a preconfigured amount of transaction IDs for each worker
            SearchTrackingComponent-->>ThresholdIndexingStrategyTransactionIdFetcher: transaction IDs
            ThresholdIndexingStrategyTransactionIdFetcher->>ThresholdIndexingStrategyTransactionIdFetcher: Stop if no transaction IDs are received. Otherwise, divide the transaction IDs for the workers.
            loop foreach worker
                ThresholdIndexingStrategyTransactionIdFetcher->>Shared queue B: Queue batch of transaction IDs.
            end
            ThresholdIndexingStrategyTransactionIdFetcher->>ThresholdIndexingStrategyTransactionIdFetcher: Update state. Also, stop if amount of transactions != amount of workers * worker batch size
        end
        loop foreach background worker
            ThresholdIndexingStrategyTransactionIdFetcher->>Shared queue B: Queue a stop signal.
        end

    and Process TxnIDs
        loop while not stopped
            ThresholdIndexingStrategyTransactionIdMerger->>Shared queue B: Fetch next batch of transaction IDs. 
            Shared queue B-->>ThresholdIndexingStrategyTransactionIdMerger: next batch
            ThresholdIndexingStrategyTransactionIdMerger->>ThresholdIndexingStrategyTransactionIdMerger: Stop if an end signal is received.
            ThresholdIndexingStrategyTransactionIdMerger->>SearchTrackingComponent: Fetch nodes associated with transaction IDs
            SearchTrackingComponent-->>ThresholdIndexingStrategyTransactionIdMerger: nodes
            loop foreach transaction
                opt if the transaction size is not sufficiently large (e.g. has not been merged previously)
                    loop foreach node in transaction
                        opt if the node is a workspace or archive node
                            ThresholdIndexingStrategyTransactionIdMerger->>ThresholdIndexingStrategyTransactionIdMerger: Add the ref. of the to a temporary cache / bucket
                            opt bucket is full
                                ThresholdIndexingStrategyTransactionIdMerger->>ThresholdIndexingStrategyTransactionIdMerger: Create a copy of the bucket and clear the original one
                                ThresholdIndexingStrategyTransactionIdMerger->>Shared queue A: Queue copy of bucket
                            end
                        end
                    end
                end
            end
        end
        ThresholdIndexingStrategyTransactionIdMerger->>Shared queue A: Queue a stop signal.
    
    and Fetch nodeRefs
        ProcessorService->>ThresholdIndexingStrategy: .getNextNodeIds(ignored value)
        loop while not all background workers have stopped working
            ThresholdIndexingStrategy->>Shared queue A: Fetch next batch of NodeRefs
            Shared queue A-->>ThresholdIndexingStrategy: next batch
            opt next batch is not a stop signal
                ThresholdIndexingStrategy-->>ProcessorService: next batch
            end
            ThresholdIndexingStrategy->>ThresholdIndexingStrategy: Keep track of amount of stopped background workers
            opt all background workers have stopped working
                ThresholdIndexingStrategy-->>ThresholdIndexingStrategy: break
            end
        end
        ThresholdIndexingStrategy-->>ProcessorService: empty batch
    end

    ProcessorService->>ThresholdIndexingStrategy: .onStop()
    ThresholdIndexingStrategy->>ThresholdIndexingStrategy: reset state and progress
    ThresholdIndexingStrategy->>ThresholdIndexingStrategyTransactionIdFetcher: .interrupt() (single thread)
    ThresholdIndexingStrategy->>ThresholdIndexingStrategyTransactionIdMerger: .interrupt() (all threads)

Loading

Explanation for the healthprocessor plugin:

sequenceDiagram
    box Purple Healthprocessor platform
    participant ProcessorService
    participant SolrUndersizedTransactionsHealthProcessorPlugin
    end
    box Darkblue SolrUndersizedTransactionsHealthProcessorPlugin internal workings
    participant Shared thread pool
    participant Worker thread
    end
    box Darkgreen Alfresco components (or Healthprocessor platform proxies)
    participant AbstractNodeDAOImpl
    participant TransactionHelper
    end

    par Receive new tasks
        ProcessorService->>SolrUndersizedTransactionsHealthProcessorPlugin: .doProcess(set of nodeRefs)
        SolrUndersizedTransactionsHealthProcessorPlugin->>SolrUndersizedTransactionsHealthProcessorPlugin: Update state.
        SolrUndersizedTransactionsHealthProcessorPlugin->>Shared thread pool: Queue new task.
        SolrUndersizedTransactionsHealthProcessorPlugin-->>ProcessorService: return healthy reports for all nodeRefs
    
    and Process old tasks in the background
        Shared thread pool-->>Worker thread: execute in separate thread
        Worker thread->>AbstractNodeDAOImpl: Fetch node IDs for nodeRefs.
        AbstractNodeDAOImpl-->>Worker thread: node IDs
        Worker thread->>TransactionHelper: Start new transaction.
        Worker thread->>TransactionHelper: .getCurrentTransactionId(...)
        TransactionHelper-->>Worker thread: transaction ID
        Worker thread->>AbstractNodeDAOImpl: .touchNodes(transaction ID, node IDs)
        Worker thread->>TransactionHelper: finalize transaction.
        Worker thread->>SolrUndersizedTransactionsHealthProcessorPlugin: Update state.
    end
Loading

@pvriel pvriel self-assigned this Feb 6, 2025
@pvriel pvriel changed the base branch from master to version-0.x.x February 6, 2025 08:55
@xenit-eu xenit-eu deleted a comment from sonarqubecloud bot Feb 6, 2025
@pvriel
Copy link
Contributor Author

pvriel commented Feb 11, 2025

New plugin diagram:

sequenceDiagram
    box Purple Healthprocessor platform
    participant ProcessorService
    participant SolrUndersizedTransactionsHealthProcessorPlugin
    end
    box Darkblue SolrUndersizedTransactionsHealthProcessorPlugin internal workings
    participant Shared thread pool
    participant Worker thread
    end
    box Darkgreen Alfresco components (or Healthprocessor platform proxies)
    participant AbstractNodeDAOImpl
    participant TransactionHelper
    end

    par Receive new tasks
        ProcessorService->>SolrUndersizedTransactionsHealthProcessorPlugin: .doProcess(set of nodeRefs)
        SolrUndersizedTransactionsHealthProcessorPlugin->>SolrUndersizedTransactionsHealthProcessorPlugin: Update state.
        opt Too many queued tasks
            SolrUndersizedTransactionsHealthProcessorPlugin->>SolrUndersizedTransactionsHealthProcessorPlugin: Wait until a previous task has been handled.
        end
        SolrUndersizedTransactionsHealthProcessorPlugin->>Shared thread pool: Queue new task.
        SolrUndersizedTransactionsHealthProcessorPlugin-->>ProcessorService: return healthy reports for all nodeRefs
    
    and Process old tasks in the background
        Shared thread pool-->>Worker thread: execute in separate thread
        Worker thread->>AbstractNodeDAOImpl: Fetch node IDs for nodeRefs.
        AbstractNodeDAOImpl-->>Worker thread: node IDs
        Worker thread->>TransactionHelper: Start new transaction.
        Worker thread->>TransactionHelper: .getCurrentTransactionId(...)
        TransactionHelper-->>Worker thread: transaction ID
        Worker thread->>AbstractNodeDAOImpl: .touchNodes(transaction ID, node IDs)
        Worker thread->>TransactionHelper: finalize transaction.
        Worker thread->>SolrUndersizedTransactionsHealthProcessorPlugin: Update state.
        Worker thread->>SolrUndersizedTransactionsHealthProcessorPlugin: Notify about processed task.
    end
Loading

@pvriel
Copy link
Contributor Author

pvriel commented Feb 11, 2025

New indexing diagram:

sequenceDiagram
    box Purple Healthprocessor platform
    participant ProcessorService
    participant ThresholdIndexingStrategy
    end
    box Darkblue ThresholdIndexingStrategy internal workings
    participant Shared queue A
    participant ThresholdIndexingStrategyTransactionIdFetcher
    participant Shared queue B
    participant ThresholdIndexingStrategyTransactionIdMerger
    end
    box Darkgreen Alfresco components
    participant SearchTrackingComponent
    participant dataSource (through JdbcTemplate)
    end

    
    ProcessorService->>ThresholdIndexingStrategy: .onStart()
    ThresholdIndexingStrategy->>ThresholdIndexingStrategy: Initialize state & progress.
    ThresholdIndexingStrategy->>ThresholdIndexingStrategyTransactionIdFetcher: .run() (in separate thread)
    ThresholdIndexingStrategy->>ThresholdIndexingStrategyTransactionIdMerger: .run() (in seperate thread(s))
    
    par Fetch TxnIDs
        loop while not stopped
            ThresholdIndexingStrategyTransactionIdFetcher->>dataSource (through JdbcTemplate): Fetch a preconfigured amount of transaction IDs for each worker.
            dataSource (through JdbcTemplate)-->>ThresholdIndexingStrategyTransactionIdFetcher: transaction IDs
            ThresholdIndexingStrategyTransactionIdFetcher->>ThresholdIndexingStrategyTransactionIdFetcher: Stop if no transaction IDs are received. Otherwise, divide the transaction IDs for the workers.
            loop foreach worker
                ThresholdIndexingStrategyTransactionIdFetcher->>Shared queue B: Queue batch of transaction IDs.
            end
            ThresholdIndexingStrategyTransactionIdFetcher->>ThresholdIndexingStrategyTransactionIdFetcher: Update state. Also, stop if amount of transactions != amount of workers * worker batch size.
        end
        loop foreach background worker
            ThresholdIndexingStrategyTransactionIdFetcher->>Shared queue B: Queue a stop signal.
        end

    and Process TxnIDs
        loop while not stopped
            ThresholdIndexingStrategyTransactionIdMerger->>Shared queue B: Fetch next batch of transaction IDs. 
            Shared queue B-->>ThresholdIndexingStrategyTransactionIdMerger: next batch
            ThresholdIndexingStrategyTransactionIdMerger->>ThresholdIndexingStrategyTransactionIdMerger: Stop if an end signal is received.
            ThresholdIndexingStrategyTransactionIdMerger->>SearchTrackingComponent: Fetch nodes associated with transaction IDs.
            SearchTrackingComponent-->>ThresholdIndexingStrategyTransactionIdMerger: nodes
            loop foreach transaction
                opt if the transaction size is not sufficiently large (e.g. has not been merged previously)
                    loop foreach node in transaction
                        ThresholdIndexingStrategyTransactionIdMerger->>ThresholdIndexingStrategyTransactionIdMerger: Add the ref. of the to a temporary cache / bucket.
                        opt bucket is full
                            ThresholdIndexingStrategyTransactionIdMerger->>ThresholdIndexingStrategyTransactionIdMerger: Create a copy of the bucket and clear the original one.
                            ThresholdIndexingStrategyTransactionIdMerger->>Shared queue A: Queue copy of bucket.
                        end
                    end
                end
            end
        end
        ThresholdIndexingStrategyTransactionIdMerger->>Shared queue A: Queue a stop signal.
    
    and Fetch nodeRefs
        ProcessorService->>ThresholdIndexingStrategy: .getNextNodeIds(ignored value)
        loop while not all background workers have stopped working
            ThresholdIndexingStrategy->>Shared queue A: Fetch next batch of NodeRefs.
            Shared queue A-->>ThresholdIndexingStrategy: next batch
            opt next batch is not a stop signal
                ThresholdIndexingStrategy-->>ProcessorService: next batch
            end
            ThresholdIndexingStrategy->>ThresholdIndexingStrategy: Keep track of amount of stopped background workers.
            opt all background workers have stopped working
                ThresholdIndexingStrategy-->>ThresholdIndexingStrategy: break
            end
        end
        ThresholdIndexingStrategy-->>ProcessorService: empty batch
    end

    ProcessorService->>ThresholdIndexingStrategy: .onStop()
    ThresholdIndexingStrategy->>ThresholdIndexingStrategy: Reset state and progress.
    ThresholdIndexingStrategy->>ThresholdIndexingStrategyTransactionIdFetcher: .interrupt() (single thread)
    ThresholdIndexingStrategy->>ThresholdIndexingStrategyTransactionIdMerger: .interrupt() (all threads)
Loading

Copy link

Quality Gate Failed Quality Gate failed

Failed conditions
79.2% Coverage on New Code (required ≥ 80%)
C Reliability Rating on New Code (required ≥ A)

See analysis details on SonarQube Cloud

Catch issues before they fail your Quality Gate with our IDE extension SonarQube for IDE

@pvriel pvriel merged commit 49c4188 into version-0.x.x Feb 11, 2025
7 of 8 checks passed
@pvriel pvriel deleted the NYS2AWS-134-second-attempt-transactions-merger branch February 11, 2025 14:20
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

1 participant